iT邦幫忙

2023 iThome 鐵人賽

DAY 28
0
Software Development

Python十翼:與未來的自己對話系列 第 28

[Day28] 末翼 - Term Projects:Project ECC - 建立EdgeDB Cloud Connection(2)

  • 分享至 

  • xImage
  •  

streamlit app目標

使用streamlit建立一個可以輸入EdgeQLquery_argsquery_kwargsform,並於submit之後,傳送queryEdgeDB Cloud執行(註1)。

下面是我們錄製一小段操作app的過程,可以點選圖片觀看。
ecc-demo-video

streamlit app架構

ECC
├── ...
├── st_comps.py
├── st_data_structures.py
├── st_utils.py
└── streamlit_app.py

st_comps.py

為擺放各種streamlit的widget及element的檔案。

st_data_structures.py

只有一個FormContentNamedTuple,於收集form內容時使用。

st_utils.py

內有著許多小工具,我們挑選幾個比較重要的function來說明。

get_loop_dictget_conn_dict

皆使用@st.cache_resource裝飾。這麼一來,當任何session一進來,都可以取得同一個loop_dictconn_dict,可以幫助我們使用現在的timestamp與存在其中的timestamp進行對比,進而執行想要的操作。

@st.cache_resource
def get_loop_dict() -> dict[str, Any]:
    return {}


@st.cache_resource
def get_conn_dict() -> dict[str, Any]:
    return {}

_routine_clean

會取得現在的timestamp,以此計算get_loop_dictget_conn_dict中是否有超過thresholdloopconn。相當於每次呼叫streamlit_app.py時,會定時清除閒置過久的資源。

def _routine_clean(excluded_tokens: list[str],
                   threshold: float = 300) -> None:
    cur_ts = get_cur_ts()

    ld = get_loop_dict()
    to_del_loop_tokens = {t
                          for t, (_, loop_ts) in ld.items()
                          if cur_ts - loop_ts > threshold}
    for _token in excluded_tokens:
        to_del_loop_tokens.discard(_token)
    for k in to_del_loop_tokens:
        try:
            del ld[k]
        except Exception as ex:
            st.toast(f'{ex=} happened in del loops', icon="🚨")

    cd = get_conn_dict()
    to_del_conn_tokens = {t
                          for t, (_, conn_ts) in cd.items()
                          if cur_ts - conn_ts > threshold}
    for _token in excluded_tokens:
        to_del_conn_tokens.discard(_token)
    for k in to_del_conn_tokens:
        try:
            del cd[k]
        except Exception as ex:
            st.toast(f'{ex=} happened in del conns', icon="🚨")

_populate_qry_args

將接收到的str;分隔,接著檢查各個arg_str是否為支援的型態。如果是的話,嘗試使用eval(arg_str)取得轉換後的型態,再appendqry_args,最後返回return tuple(qry_args)

def _populate_qry_args(qry_args_str: str) -> tuple[Any, ...]:
    qry_args: list[Any] = []
    for arg_str in qry_args_str.split(';'):
        if arg_str.strip() and \
                isinstance(arg_str, (str, datetime.date, datetime.datetime)):
            try:
                eval_arg = eval(arg_str)
            except SyntaxError as e:
                st.warning(
                    'Can not parse the positional query arguments!')
                raise e
            else:
                qry_args.append(eval_arg)
    return tuple(qry_args)

_populate_qry_kwargs

將接收到的str;分隔,接著嘗試使用exec(kwarg_str.strip(), globals(), qry_kwargs),將kwarg_strpopulate到qry_kwargs中,並於最後返回qry_kwargs

def _populate_qry_kwargs(qry_kwargs_str: str) -> dict[str, Any]:
    qry_kwargs: dict[str, Any] = {}
    for kwarg_str in qry_kwargs_str.split(';'):
        try:
            exec(kwarg_str.strip(), globals(), qry_kwargs)
        except SyntaxError as e:
            st.warning(
                'Can not parse the named query arguments!')
            raise e
    return qry_kwargs

_convert_form_to_record

form接收的內容轉換為QueryRecord格式。

我們曾經想在_receive_required_single中的st.radio使用Enum,但streamlit常會報錯,所以只好接收str型態再使用convert_str_to_required_single轉為Enum

def _convert_form_to_record(form: FormContent) -> QueryRecord:
    qry = form.qry
    extra_args = _populate_qry_args(form.qry_args_str)
    jsonify = convert_bool_to_jsonify(form.jsonify)
    required_single = convert_str_to_required_single(form.required_single)
    extra_kwargs = _populate_qry_kwargs(form.qry_kwargs_str)
    task_name = uuid.uuid4().hex[:6]

    return QueryRecord(qry,
                       extra_args,
                       jsonify,
                       required_single,
                       extra_kwargs,
                       task_name)

_create_task_from_form

使用傳入的tg來新增task

async def _create_task_from_form(tg: asyncio.TaskGroup,
                                 conn: EdgeDBCloudConn,
                                 form: FormContent,
                                 tasks: set[asyncio.Task[Any]]) -> None:
    record = _convert_form_to_record(form)
    async with conn:
        task = tg.create_task(conn.query(record.qry,
                                         *record.extra_args,
                                         jsonify=record.jsonify,
                                         required_single=record.required_single,
                                         **record.extra_kwargs),
                              name=record.task_name)
        tasks.add(task)

streamlit_app.py

建立loop

我們希望多個session能同時獨立操作,所以需要針對每個session建立loopconn,無法簡單的呼叫asyncio.run就好。由於loopconn都需要在每個session一開始就確定下來,所以如果將_prepare_loop_prepare_conn移至其它檔案會出現問題。

...
import nest_asyncio

nest_asyncio.apply()

if 'token' not in st.session_state:
    token = generate_token()
    logging.info(f'Generating token: {token}')
    st.session_state['token'] = token


if __name__ == '__main__':
    cur_ts = get_cur_ts()
    token = st.session_state.token
    excluded_tokens = [token]

    loop = _prepare_loop(cur_ts, token)
    conn = _prepare_conn(cur_ts, token)

    _display_res(token, loop, conn, excluded_tokens)
    _routine_clean(excluded_tokens)

    asyncio.set_event_loop(loop)
    loop.run_until_complete(run(main, conn, token))
  • 於每個session一開始時,呼叫generate_token產生獨特的token,並儲存於st.session_state中。
  • 呼叫_prepare_loop準備loop
  • 呼叫_prepare_conn準備conn
  • 呼叫_display_res於最上方顯示resource,並生成refreshtry free resource兩個button。
  • 呼叫_routine_clean定時清理loopconn
  • 透過asyncio.set_event_loop設定loop
  • 透過loop.run_until_complete執行run,啟動event loop。

_prepare_loop

  • 透過get_loop_dict取得loop_dict
  • 接著檢查token是否在loop_dict中。如果不在的話,呼叫asyncio.new_event_loop建立一個新loop;如果在的話,從中取出loop
  • 透過loop_dict[token] = (loop, cur_ts)更新timestamp
  • 最後返回loop
def _prepare_loop(cur_ts: int, token: str) -> asyncio.AbstractEventLoop:
    loop_dict = get_loop_dict()
    if token not in loop_dict:
        loop = asyncio.new_event_loop()
    else:
        loop, _ = loop_dict[token]
    loop_dict[token] = (loop, cur_ts)
    return loop

_prepare_conn

  • 透過get_conn_dict取得conn_dict
  • 接著檢查token是否在conn_dict中。如果不在的話,呼叫asyncio.new_event_loop建立一個新conn;如果在的話,從中取出conn
  • 透過conn_dict[token] = (conn, cur_ts)更新timestamp
  • 最後返回conn
def _prepare_conn(cur_ts: int, token: str) -> EdgeDBCloudConn:
    conn_dict = get_conn_dict()
    if token not in conn_dict:
        conn = EdgeDBCloudConn(**load_st_toml())
    else:
        conn, _ = conn_dict[token]
    conn_dict[token] = (conn, cur_ts)
    return conn

run

asyncio所執行的coroutine,其內為一個try-except*-else結構。

  • try中,使用asyncio.TaskGroupapp整體布局的algo(即main function)加入到task。請注意,這邊我們用到[Day26]新學到的技巧,來將tg往下傳給algo。這麼一來,除了當前這個task外,algo內也可以新增其它的task
  • except* Exception as ex中,經過render後,印出各Exception的錯誤資訊。
  • else中,呈現query的結果。
...
tasks = set()

async def run(algo, conn: EdgeDBCloudConn, token: str) -> None:
    top_name = 'top'
    try:
        async with asyncio.TaskGroup() as tg:
            task = tg.create_task(algo(tg, conn, token), name=top_name)
            tasks.add(task)
    except* Exception as ex:
        for exc in ex.exceptions:
            st.warning(f'Exception: {type(exc).__name__}')
            _render_exception(exc)
    else:
        for task in tasks:
            if (task_name := task.get_name()) != top_name:
                st.write(f'task_name: {task_name}')
                _render_result(task.result())

main

為整個app的layout

  • 呼叫_display_sidebar,建立sidebar element
  • 呼叫_get_query_form,建立form element
  • formsubmit時,呼叫_create_tasks_for_form搜集form中各項資料並整理後,建立query task
  • 呼叫_display_big_red_btn_and_db_calls,建立一個清除conn的小工具。
...

async def main(tg: asyncio.TaskGroup, conn: EdgeDBCloudConn, token: str) -> None:
    _display_sidebar()
    form = _get_query_form()
    if form.submitted:
        await _create_tasks_for_form(tg, conn, form)
    _display_big_red_btn_and_db_calls(conn, token)

部署至streamlit cloud

streamlit cloud可以部署無限制的public app及一個private app

您可以使用private app或是使用public app加上authenticator來部署。

部署的過程很簡單,只需完成下列各個選項。所有的credentials可以置於Advanced settings中,並於app內使用st.secrets存取。這相當於local開發時的.streamlit/secrets.toml

streamlit-cloud

後記

這個project還有非常多可以改進的地方,例如:

  • get_loop_dictget_conn_dict這樣的方法,在連線數較少時可以使用,但是當連線數較多時,記憶體使用量也會增加不少。或許我們可以將其轉為其它格式,例如pickle,然後使用另一個背景程式來定時改動及讀取pickle
  • 目前的Try Free Res是可以清除掉所有threshold大於3loopsconns
  • _populate_qry_args_populate_qry_kwargs需要使用比evalexec更安全的處理方式。
  • 嘗試其它前端工具。
  • mocking引入tests
  • type annotation還有很大的進步空間。
  • 這個app是設計以asyncio長期等待task,所以db connection不需關閉。但當遇到shutdown時,如何有效關閉所有的connections,還需要好好想想。
  • ...

儘管如此,還是學習到了很多,例如:

  • 更加了解EdgeDB-Python的各種功能。
  • 使用新的asyncio.TaskGroupExceptionGroup來處理asyncio問題。
  • 更加熟悉structural pattern matching的技巧。
  • 第一次嘗試將streamlit各個compoment分開,而不是全部擠在streamlit_app.py
  • ...

備註

註1:EdgeDB Cloud已有很好的UI操控介面可以使用,本日內容純屬自我練習之用。

註2:需使用nest_asyncio.apply來防止更新loopconn時,容易出現的RuntimeError: Task <Task pending name='Task-xxx' ...> attached to a different loop

Code

本日程式碼傳送門


上一篇
[Day27] 末翼 - Term Projects:Project ECC - 建立EdgeDB Cloud Connection(1)
下一篇
[Day29] 末翼 - Term Projects:Project Postman - 研究如何傳遞decorator factory之參數
系列文
Python十翼:與未來的自己對話30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言